package flinkstudy.stream.windows;


import flinkstudy.bo.EstateInfo;
import flinkstudy.stream.environment.FlinkStreamExecutionEnvironment;
import flinkstudy.stream.source.mock.DynamicUnlimitedData;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

/**
 * 窗口
 * <pre>
 *
 * Keyed Windows
 *
 * stream
 *        .keyBy(...)               <-  keyed versus non-keyed windows
 *        .window(...)              <-  required: "assigner"
 *       [.trigger(...)]            <-  optional: "trigger" (else default trigger)
 *       [.evictor(...)]            <-  optional: "evictor" (else no evictor)
 *       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
 *       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
 *        .reduce/aggregate/fold/apply()      <-  required: "function"
 *       [.getSideOutput(...)]      <-  optional: "output tag"
 *
 *
 * 窗口时间常用类
 *
 * @see org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
 * @see org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
 *
 *
 *
 * @author daocr
 * @date 2019/12/19
 */
public class KeyedWindows {

    private FlinkStreamExecutionEnvironment flinkStreamExecutionEnvironment = null;


    @Before
    public void init() {
        flinkStreamExecutionEnvironment = new FlinkStreamExecutionEnvironment();
    }


    /**
     * 滚动窗口
     * <p>
     * 数据无重叠
     *
     * @see org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
     */
    @Test
    public void tumblingTimeWindows() throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        // 获取数据源
        DataStreamSource<EstateInfo> estateInfoDataStreamSource = streamExecutionEnvironment.addSource(new DynamicUnlimitedData());

        estateInfoDataStreamSource
                // 打印原始数据
                .map(new MapFunction<EstateInfo, EstateInfo>() {
                    @Override
                    public EstateInfo map(EstateInfo value) throws Exception {
                        System.out.println("原始数据： " + value);
                        return value;
                    }
                })
                // 按照 cityId 分组
                .keyBy("cityId")
                // 统计过去10秒,总价格
                .timeWindow(Time.seconds(10))
                .sum("price")
                // 把统计结构进行格式化处理
                .process(new ProcessFunction<EstateInfo, Tuple2<Integer, Long>>() {
                    @Override
                    public void processElement(EstateInfo value, Context ctx, Collector<Tuple2<Integer, Long>> out) throws Exception {
                        out.collect(Tuple2.of(value.getCityId(), value.getPrice()));
                    }
                })
                .print()
                .setParallelism(1);

        streamExecutionEnvironment.execute();
    }


    /**
     * 滑动窗口
     * <p>
     * 数据有重叠
     *
     * @see org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
     */
    @Test
    public void slidingTimeWindows() throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        // 获取数据源
        DataStreamSource<EstateInfo> estateInfoDataStreamSource = streamExecutionEnvironment.addSource(new DynamicUnlimitedData());


        estateInfoDataStreamSource
                // 打印原始数据
                .map(new MapFunction<EstateInfo, EstateInfo>() {
                    @Override
                    public EstateInfo map(EstateInfo value) throws Exception {

                        System.out.println("原始数据： " + value);
                        return value;
                    }
                })
                // 按照 cityId 分组
                .keyBy("cityId")
                // 每10秒 统计过去 一分钟的数据
                .timeWindow(Time.minutes(1), Time.seconds(10))
                .sum("price")
                // 把统计结构进行格式化处理
                .process(new ProcessFunction<EstateInfo, Tuple2<Integer, Long>>() {
                    @Override
                    public void processElement(EstateInfo value, Context ctx, Collector<Tuple2<Integer, Long>> out) throws Exception {
                        out.collect(Tuple2.of(value.getCityId(), value.getPrice()));
                    }
                })
                .print()
                .setParallelism(1);

        streamExecutionEnvironment.execute();
    }


}
